In this notebook, we perform the following steps:
In [ ]:
!pip install --user python-twitter
In [ ]:
!pip install --user watson-developer-cloud
In [ ]:
!pip install --upgrade --user pixiedust
This jar file contains the Spark Streaming application (written in Scala) that connects to Twitter to fetch the tweets and send them to Watson Tone Analyzer for analysis. The resulting scores are then added to the tweets dataframe as separate columns.
In [ ]:
import pixiedust
jarPath = "https://github.com/ibm-watson-data-lab/spark.samples/raw/master/dist/streaming-twitter-assembly-1.6.jar"
pixiedust.installPackage(jarPath)
print("done")
Insert your credentials for Twitter, Watson Tone Analyzer, and Watson Personality Insights. Then run the following cell. Read how to provision these services and get credentials.
In [ ]:
import pixiedust
sqlContext=SQLContext(sc)
#Set up the twitter credentials, they will be used both in scala and python cells below
consumerKey = "XXXX"
consumerSecret = "XXXX"
accessToken = "XXXX"
accessTokenSecret = "XXXX"
#Set up the Watson Personality insight credentials
piUserName = "XXXX"
piPassword = "XXXX"
#Set up the Watson Tone Analyzer credentials
taUserName = "XXXX"
taPassword = "XXXX"
In [ ]:
%%scala
val demo = com.ibm.cds.spark.samples.StreamingTwitter
demo.setConfig("twitter4j.oauth.consumerKey",consumerKey)
demo.setConfig("twitter4j.oauth.consumerSecret",consumerSecret)
demo.setConfig("twitter4j.oauth.accessToken",accessToken)
demo.setConfig("twitter4j.oauth.accessTokenSecret",accessTokenSecret)
demo.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer/api")
demo.setConfig("watson.tone.password",taPassword)
demo.setConfig("watson.tone.username",taUserName)
import org.apache.spark.streaming._
demo.startTwitterStreaming(sc, Seconds(30)) //Run the application for a limited time
In [ ]:
%%scala
val demo = com.ibm.cds.spark.samples.StreamingTwitter
val (__sqlContext, __df) = demo.createTwitterDataFrames(sc)
In [ ]:
import pyspark.sql.functions as F
usersDF = __df.groupby("author", "userid").agg(F.avg("Anger").alias("Anger"), F.avg("Disgust").alias("Disgust"))
usersDF.show()
In [ ]:
import twitter
api = twitter.Api(consumer_key=consumerKey,
consumer_secret=consumerSecret,
access_token_key=accessToken,
access_token_secret=accessTokenSecret)
#print(api.VerifyCredentials())
In [ ]:
def getTweets(screenName):
statuses = api.GetUserTimeline(screen_name=screenName,
since_id=None,
max_id=None,
count=200,
include_rts=False,
trim_user=False,
exclude_replies=True)
return statuses
usersWithTweetsRDD = usersDF.flatMap(lambda s: [(s.user.screen_name, s.text.encode('ascii', 'ignore')) for s in getTweets(s['userid'])])
print(usersWithTweetsRDD.count())
In [ ]:
import re
usersWithTweetsRDD2 = usersWithTweetsRDD.map(lambda s: (s[0], s[1])).reduceByKey(lambda s,t: s + '\n' + t)\
.filter(lambda s: len(re.findall(r'\w+', s[1])) > 100 )
print(usersWithTweetsRDD2.count())
#usersWithTweetsRDD2.take(2)
Watson Personality Insights requires at least 100 words from its lexicon to be available, which may not exist for each user. This is why the getPersonlityInsight helper function guards against exceptions from calling Watson PI. If an exception occurs, then an empty array is returned. Each record with empty array is filtered out of the resulting RDD.
Note also that we use broadcast variables to propagate the userName and password to the cluster
In [ ]:
from pyspark.sql.types import *
from watson_developer_cloud import PersonalityInsightsV3
broadCastPIUsername = sc.broadcast(piUserName)
broadCastPIPassword = sc.broadcast(piPassword)
def getPersonalityInsight(text, schema=False):
personality_insights = PersonalityInsightsV3(
version='2016-10-20',
username=broadCastPIUsername.value,
password=broadCastPIPassword.value)
try:
p = personality_insights.profile(
text, content_type='text/plain',
raw_scores=True, consumption_preferences=True)
if schema:
return \
[StructField(t['name'], FloatType()) for t in p["needs"]] + \
[StructField(t['name'], FloatType()) for t in p["values"]] + \
[StructField(t['name'], FloatType()) for t in p['personality' ]]
else:
return \
[t['raw_score'] for t in p["needs"]] + \
[t['raw_score'] for t in p["values"]] + \
[t['raw_score'] for t in p['personality']]
except:
return []
usersWithPIRDD = usersWithTweetsRDD2.map(lambda s: [s[0]] + getPersonalityInsight(s[1])).filter(lambda s: len(s)>1)
print(usersWithPIRDD.count())
#usersWithPIRDD.take(2)
In [ ]:
#convert to dataframe
schema = StructType(
[StructField('userid',StringType())] + getPersonalityInsight(usersWithTweetsRDD2.take(1)[0][1], schema=True)
)
usersWithPIDF = sqlContext.createDataFrame(
usersWithPIRDD, schema
)
usersWithPIDF.cache()
display(usersWithPIDF)
For a quick look on the difference in Personality Insights scores Spark provides a describe() function that computes stddev and mean values off the dataframe. Compare differences in the scores of twitter users and presidential candidates.
In [ ]:
candidates = "realDonaldTrump HillaryClinton".split(" ")
candidatesRDD = sc.parallelize(candidates)\
.flatMap(lambda s: [(t.user.screen_name, t.text.encode('ascii', 'ignore')) for t in getTweets(s)])\
.map(lambda s: (s[0], s[1]))\
.reduceByKey(lambda s,t: s + '\n' + t)\
.filter(lambda s: len(re.findall(r'\w+', s[1])) > 100 )\
.map(lambda s: [s[0]] + getPersonalityInsight(s[1]))
candidatesPIDF = sqlContext.createDataFrame(
candidatesRDD, schema
)
In [ ]:
c = candidatesPIDF.collect()
broadCastTrumpPI = sc.broadcast(c[0][1:])
broadCastHillaryPI = sc.broadcast(c[1][1:])
In [ ]:
display(candidatesPIDF)
In [ ]:
candidatesPIDF.select('userid','Emotional range','Agreeableness', 'Extraversion','Conscientiousness', 'Openness').show()
usersWithPIDF.describe(['Emotional range']).show()
usersWithPIDF.describe(['Agreeableness']).show()
usersWithPIDF.describe(['Extraversion']).show()
usersWithPIDF.describe(['Conscientiousness']).show()
usersWithPIDF.describe(['Openness']).show()
In [ ]:
import numpy as np
from pyspark.sql.types import Row
def addEuclideanDistance(s):
dict = s.asDict()
def getEuclideanDistance(a,b):
return np.linalg.norm(np.array(a) - np.array(b)).item()
dict["distDonaldTrump"]=getEuclideanDistance(s[1:], broadCastTrumpPI.value)
dict["distHillary"]=getEuclideanDistance(s[1:], broadCastHillaryPI.value)
dict["closerHillary"] = "Yes" if dict["distHillary"] < dict["distDonaldTrump"] else "No"
return Row(**dict)
#add euclidean distances to Trump and Hillary
euclideanDF = sqlContext.createDataFrame(usersWithPIDF.map(lambda s: addEuclideanDistance(s)))
#Reorder columns to have userid and distances first
cols = euclideanDF.columns
reorderCols = ["userid","distHillary","distDonaldTrump", "closerHillary"]
euclideanDF = euclideanDF.select(reorderCols + [x for x in cols if x not in reorderCols])
#PixieDust display.
#To visualize the distribution, select the bar chart display, use closerHillary as key and value and aggregation=count
display(euclideanDF)
In [ ]:
tweets=__df
tweets.count()
display(tweets)
In [ ]:
#create an array that will hold the count for each sentiment
sentimentDistribution=[0] * 13
#For each sentiment, run a sql query that counts the number of tweets for which the sentiment score is greater than 60%
#Store the data in the array
for i, sentiment in enumerate(tweets.columns[-13:]):
sentimentDistribution[i]=__sqlContext.sql("SELECT count(*) as sentCount FROM tweets where " + sentiment + " > 60")\
.collect()[0].sentCount
In [ ]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
ind=np.arange(13)
width = 0.35
bar = plt.bar(ind, sentimentDistribution, width, color='g', label = "distributions")
params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )
plt.ylabel('Tweet count')
plt.xlabel('Tone')
plt.title('Distribution of tweets by sentiments > 60%')
plt.xticks(ind+width, tweets.columns[-13:])
plt.legend()
plt.show()
In [ ]:
from operator import add
import re
tagsRDD = tweets.flatMap( lambda t: re.split("\s", t.text))\
.filter( lambda word: word.startswith("#") )\
.map( lambda word : (word, 1 ))\
.reduceByKey(add, 10).map(lambda (a,b): (b,a)).sortByKey(False).map(lambda (a,b):(b,a))
top10tags = tagsRDD.take(10)
In [ ]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2, plSize[1]*2) )
labels = [i[0] for i in top10tags]
sizes = [int(i[1]) for i in top10tags]
colors = ['yellowgreen', 'gold', 'lightskyblue', 'lightcoral', "beige", "paleturquoise", "pink", "lightyellow", "coral"]
plt.pie(sizes, labels=labels, colors=colors,autopct='%1.1f%%', shadow=True, startangle=90)
plt.axis('equal')
plt.show()
In [ ]:
cols = tweets.columns[-13:]
def expand( t ):
ret = []
for s in [i[0] for i in top10tags]:
if ( s in t.text ):
for tone in cols:
ret += [s.replace(':','').replace('-','') + u"-" + unicode(tone) + ":" + unicode(getattr(t, tone))]
return ret
def makeList(l):
return l if isinstance(l, list) else [l]
#Create RDD from tweets dataframe
tagsRDD = tweets.map(lambda t: t )
#Filter to only keep the entries that are in top10tags
tagsRDD = tagsRDD.filter( lambda t: any(s in t.text for s in [i[0] for i in top10tags] ) )
#Create a flatMap using the expand function defined above, this will be used to collect all the scores
#for a particular tag with the following format: Tag-Tone-ToneScore
tagsRDD = tagsRDD.flatMap( expand )
#Create a map indexed by Tag-Tone keys
tagsRDD = tagsRDD.map( lambda fullTag : (fullTag.split(":")[0], float( fullTag.split(":")[1]) ))
#Call combineByKey to format the data as follow
#Key=Tag-Tone
#Value=(count, sum_of_all_score_for_this_tone)
tagsRDD = tagsRDD.combineByKey((lambda x: (x,1)),
(lambda x, y: (x[0] + y, x[1] + 1)),
(lambda x, y: (x[0] + y[0], x[1] + y[1])))
#ReIndex the map to have the key be the Tag and value be (Tone, Average_score) tuple
#Key=Tag
#Value=(Tone, average_score)
tagsRDD = tagsRDD.map(lambda (key, ab): (key.split("-")[0], (key.split("-")[1], round(ab[0]/ab[1], 2))))
#Reduce the map on the Tag key, value becomes a list of (Tone,average_score) tuples
tagsRDD = tagsRDD.reduceByKey( lambda x, y : makeList(x) + makeList(y) )
#Sort the (Tone,average_score) tuples alphabetically by Tone
tagsRDD = tagsRDD.mapValues( lambda x : sorted(x) )
#Format the data as expected by the plotting code in the next cell.
#map the Values to a tuple as follow: ([list of tone], [list of average score])
#e.g. #someTag:([u'Agreeableness', u'Analytical', u'Anger', u'Cheerfulness', u'Confident', u'Conscientiousness', u'Negative', u'Openness', u'Tentative'], [1.0, 0.0, 0.0, 1.0, 0.0, 0.48, 0.0, 0.02, 0.0])
tagsRDD = tagsRDD.mapValues( lambda x : ([elt[0] for elt in x],[elt[1] for elt in x]) )
#Use custom sort function to sort the entries by order of appearance in top10tags
def customCompare( key ):
for (k,v) in top10tags:
if k == key:
return v
return 0
tagsRDD = tagsRDD.sortByKey(ascending=False, numPartitions=None, keyfunc = customCompare)
#Take the mean tone scores for the top 10 tags
top10tagsMeanScores = tagsRDD.take(10)
In [ ]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*3, plSize[1]*2) )
top5tagsMeanScores = top10tagsMeanScores[:5]
width = 0
ind=np.arange(13)
(a,b) = top5tagsMeanScores[0]
labels=b[0]
colors = ["beige", "paleturquoise", "pink", "lightyellow", "coral", "lightgreen", "gainsboro", "aquamarine","c"]
idx=0
for key, value in top5tagsMeanScores:
plt.bar(ind + width, value[1], 0.15, color=colors[idx], label=key)
width += 0.15
idx += 1
plt.xticks(ind+0.3, labels)
plt.ylabel('AVERAGE SCORE')
plt.xlabel('TONES')
plt.title('Breakdown of top hashtags by sentiment tones')
plt.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc='center',ncol=5, mode="expand", borderaxespad=0.)
plt.show()
In [ ]:
%%scala
val demo = com.ibm.cds.spark.samples.PixiedustStreamingTwitter
demo.setConfig("twitter4j.oauth.consumerKey",consumerKey)
demo.setConfig("twitter4j.oauth.consumerSecret",consumerSecret)
demo.setConfig("twitter4j.oauth.accessToken",accessToken)
demo.setConfig("twitter4j.oauth.accessTokenSecret",accessTokenSecret)
demo.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer/api")
demo.setConfig("watson.tone.password",taPassword)
demo.setConfig("watson.tone.username",taUserName)
demo.setConfig("checkpointDir", System.getProperty("user.home") + "/pixiedust/ssc")
In [ ]:
!pip install --upgrade --user pixiedust-twitterdemo
In [ ]:
from pixiedust_twitterdemo import *
twitterDemo()
In [ ]:
display(__tweets)
In [ ]:
from pyspark.sql import Row
from pyspark.sql.types import *
emotions=__tweets.columns[-13:]
distrib = __tweets.flatMap(lambda t: [(x,t[x]) for x in emotions]).filter(lambda t: t[1]>60)\
.toDF(StructType([StructField('emotion',StringType()),StructField('score',DoubleType())]))
display(distrib)
In [ ]:
__tweets.registerTempTable("pixiedust_tweets")
#create an array that will hold the count for each sentiment
sentimentDistribution=[0] * 13
#For each sentiment, run a sql query that counts the number of tweets for which the sentiment score is greater than 60%
#Store the data in the array
for i, sentiment in enumerate(__tweets.columns[-13:]):
sentimentDistribution[i]=sqlContext.sql("SELECT count(*) as sentCount FROM pixiedust_tweets where " + sentiment + " > 60")\
.collect()[0].sentCount
In [ ]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
ind=np.arange(13)
width = 0.35
bar = plt.bar(ind, sentimentDistribution, width, color='g', label = "distributions")
params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )
plt.ylabel('Tweet count')
plt.xlabel('Tone')
plt.title('Distribution of tweets by sentiments > 60%')
plt.xticks(ind+width, __tweets.columns[-13:])
plt.legend()
plt.show()
In [ ]:
from operator import add
import re
tagsRDD = __tweets.flatMap( lambda t: re.split("\s", t.text))\
.filter( lambda word: word.startswith("#") )\
.map( lambda word : (word, 1 ))\
.reduceByKey(add, 10).map(lambda (a,b): (b,a)).sortByKey(False).map(lambda (a,b):(b,a))
top10tags = tagsRDD.take(10)
In [ ]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2, plSize[1]*2) )
labels = [i[0] for i in top10tags]
sizes = [int(i[1]) for i in top10tags]
colors = ['yellowgreen', 'gold', 'lightskyblue', 'lightcoral', "beige", "paleturquoise", "pink", "lightyellow", "coral"]
plt.pie(sizes, labels=labels, colors=colors,autopct='%1.1f%%', shadow=True, startangle=90)
plt.axis('equal')
plt.show()
In [ ]:
cols = __tweets.columns[-13:]
def expand( t ):
ret = []
for s in [i[0] for i in top10tags]:
if ( s in t.text ):
for tone in cols:
ret += [s.replace(':','').replace('-','') + u"-" + unicode(tone) + ":" + unicode(getattr(t, tone))]
return ret
def makeList(l):
return l if isinstance(l, list) else [l]
#Create RDD from tweets dataframe
tagsRDD = __tweets.map(lambda t: t )
#Filter to only keep the entries that are in top10tags
tagsRDD = tagsRDD.filter( lambda t: any(s in t.text for s in [i[0] for i in top10tags] ) )
#Create a flatMap using the expand function defined above, this will be used to collect all the scores
#for a particular tag with the following format: Tag-Tone-ToneScore
tagsRDD = tagsRDD.flatMap( expand )
#Create a map indexed by Tag-Tone keys
tagsRDD = tagsRDD.map( lambda fullTag : (fullTag.split(":")[0], float( fullTag.split(":")[1]) ))
#Call combineByKey to format the data as follow
#Key=Tag-Tone
#Value=(count, sum_of_all_score_for_this_tone)
tagsRDD = tagsRDD.combineByKey((lambda x: (x,1)),
(lambda x, y: (x[0] + y, x[1] + 1)),
(lambda x, y: (x[0] + y[0], x[1] + y[1])))
#ReIndex the map to have the key be the Tag and value be (Tone, Average_score) tuple
#Key=Tag
#Value=(Tone, average_score)
tagsRDD = tagsRDD.map(lambda (key, ab): (key.split("-")[0], (key.split("-")[1], round(ab[0]/ab[1], 2))))
#Reduce the map on the Tag key, value becomes a list of (Tone,average_score) tuples
tagsRDD = tagsRDD.reduceByKey( lambda x, y : makeList(x) + makeList(y) )
#Sort the (Tone,average_score) tuples alphabetically by Tone
tagsRDD = tagsRDD.mapValues( lambda x : sorted(x) )
#Format the data as expected by the plotting code in the next cell.
#map the Values to a tuple as follow: ([list of tone], [list of average score])
#e.g. #someTag:([u'Agreeableness', u'Analytical', u'Anger', u'Cheerfulness', u'Confident', u'Conscientiousness', u'Negative', u'Openness', u'Tentative'], [1.0, 0.0, 0.0, 1.0, 0.0, 0.48, 0.0, 0.02, 0.0])
tagsRDD = tagsRDD.mapValues( lambda x : ([elt[0] for elt in x],[elt[1] for elt in x]) )
#Use custom sort function to sort the entries by order of appearance in top10tags
def customCompare( key ):
for (k,v) in top10tags:
if k == key:
return v
return 0
tagsRDD = tagsRDD.sortByKey(ascending=False, numPartitions=None, keyfunc = customCompare)
#Take the mean tone scores for the top 10 tags
top10tagsMeanScores = tagsRDD.take(10)
In [ ]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*3, plSize[1]*2) )
top5tagsMeanScores = top10tagsMeanScores[:5]
width = 0
ind=np.arange(13)
(a,b) = top5tagsMeanScores[0]
labels=b[0]
colors = ["beige", "paleturquoise", "pink", "lightyellow", "coral", "lightgreen", "gainsboro", "aquamarine","c"]
idx=0
for key, value in top5tagsMeanScores:
plt.bar(ind + width, value[1], 0.15, color=colors[idx], label=key)
width += 0.15
idx += 1
plt.xticks(ind+0.3, labels)
plt.ylabel('AVERAGE SCORE')
plt.xlabel('TONES')
plt.title('Breakdown of top hashtags by sentiment tones')
plt.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc='center',ncol=5, mode="expand", borderaxespad=0.)
plt.show()